﻿using System.Collections.Generic;
using System.Threading;
using System.Reflection;
using System;
using WebSocketSharp;
using WebSocketSharp.Net;
using WebSocketSharp.Server;
using SimpleJSON;
using UnityEngine;

/**
 * This class handles the connection with the external ROS world, deserializing
 * json messages into appropriate instances of packets and messages.
 * 
 * This class also provides a mechanism for having the callback's exectued on the rendering thread.
 * (Remember, Unity has a single rendering thread, so we want to do all of the communications stuff away
 * from that. 
 * 
 * The one other clever thing that is done here is that we only keep 1 (the most recent!) copy of each message type
 * that comes along.
 * 
 * Version History
 * 3.1 - changed methods to start with an upper case letter to be more consistent with c#
 * style.
 * 3.0 - modification from hand crafted version 2.0
 * 
 * @author Michael Jenkin, Robert Codd-Downey and Andrew Speers
 * @version 3.1
 * 
 * Modificated by LidarUncle
 * 2017.4
 */

namespace ROSBridgeLib
{
    public class ROSBridgeWebSocketConnection
    {
        private class RenderTask
        {
            private Type _subscriber;
            private string _topic;
            private ROSBridgeMsg _msg;

            public RenderTask(Type subscriber, string topic, ROSBridgeMsg msg)
            {
                _subscriber = subscriber;
                _topic = topic;
                _msg = msg;
            }

            public Type getSubscriber()
            {
                return _subscriber;
            }

            public ROSBridgeMsg getMsg()
            {
                return _msg;
            }

            public string getTopic()
            {
                return _topic;
            }

        };

        private string _host;
        private int _port;
        private WebSocket _ws;
        private System.Threading.Thread _myThread;

       

        private List<Type> _subscribers;
        // our subscribers
        private List<Type> _publishers;
        //our publishers
        private Type _serviceResponse;
        // to deal with service responses
        private string _serviceName = null;
        private string _serviceValues = null;
        private List<RenderTask> _taskQ = new List<RenderTask>();

       


        private object _queueLock = new object();

        private object _messageLock = new object();

        private Queue<string> smallMessages = new Queue<string>();

        private Thread smallMessageThread;

        private object smallMessageLock = new object();

       


        private string nextMiddleMessage;

        private Thread middleMessageThread;

        private object middleMessageLock = new object();


        private string nextBigMessage;

        private Thread bigMessageThread;

        private object bigMessageLock = new object();


        public delegate void OnConnect();

        public OnConnect onConnnect;


        public delegate void OnError();

        public OnError onError;


        public delegate void OnDisconnect();

        public OnDisconnect onDisconnect;





        private static string GetMessageType(Type t)
        {
            return (string)t.GetMethod("GetMessageType", BindingFlags.Public | BindingFlags.Static | BindingFlags.FlattenHierarchy).Invoke(null, null);
        }

        private static string GetMessageTopic(Type t)
        {
            return (string)t.GetMethod("GetMessageTopic", BindingFlags.Public | BindingFlags.Static | BindingFlags.FlattenHierarchy).Invoke(null, null);
        }

        private static ROSBridgeMsg ParseMessage(Type t, JSONNode node)
        {
            return (ROSBridgeMsg)t.GetMethod("ParseMessage", BindingFlags.Public | BindingFlags.Static | BindingFlags.FlattenHierarchy).Invoke(null, new object[] { node });
        }

        private static void Update(Type t, ROSBridgeMsg msg)
        {
            t.GetMethod("CallBack", BindingFlags.Public | BindingFlags.Static | BindingFlags.FlattenHierarchy).Invoke(null, new object[] { msg });
        }

        private static void ServiceResponse(Type t, string service, string yaml)
        {
            t.GetMethod("ServiceCallBack", BindingFlags.Public | BindingFlags.Static | BindingFlags.FlattenHierarchy).Invoke(null, new object[] { service, yaml });
        }

        private static void IsValidServiceResponse(Type t)
        {
            if (t.GetMethod("ServiceCallBack", BindingFlags.Public | BindingFlags.Static | BindingFlags.FlattenHierarchy) == null)
                throw new Exception("invalid service response handler");
        }

        private static void IsValidSubscriber(Type t)
        {
            if (t.GetMethod("CallBack", BindingFlags.Public | BindingFlags.Static | BindingFlags.FlattenHierarchy) == null)
                throw new Exception("missing Callback method");
            if (t.GetMethod("GetMessageType", BindingFlags.Public | BindingFlags.Static | BindingFlags.FlattenHierarchy) == null)
                throw new Exception("missing GetMessageType method");
            if (t.GetMethod("GetMessageTopic", BindingFlags.Public | BindingFlags.Static | BindingFlags.FlattenHierarchy) == null)
                throw new Exception("missing GetMessageTopic method");
            if (t.GetMethod("ParseMessage", BindingFlags.Public | BindingFlags.Static | BindingFlags.FlattenHierarchy) == null)
                throw new Exception("missing ParseMessage method");
        }

        private static void IsValidPublisher(Type t)
        {
            if (t.GetMethod("GetMessageType", BindingFlags.Public | BindingFlags.Static | BindingFlags.FlattenHierarchy) == null)
                throw new Exception("missing GetMessageType method");
            if (t.GetMethod("GetMessageTopic", BindingFlags.Public | BindingFlags.Static | BindingFlags.FlattenHierarchy) == null)
                throw new Exception("missing GetMessageTopic method");
        }


        /**
		 * Make a connection to a host/port. 
		 * This does not actually start the connection, use Connect to do that.
		 */

        public ROSBridgeWebSocketConnection(string host, int port)
        {
            _host = host;
            _port = port;
            _myThread = null;
            _subscribers = new List<Type>();
            _publishers = new List<Type>();
        }

        /**
		 * Add a service response callback to this connection.
		 */
        public void AddServiceResponse(Type serviceResponse)
        {
            IsValidServiceResponse(serviceResponse);
            _serviceResponse = serviceResponse;
        }

        /**
		 * Add a subscriber callback to this connection. There can be many subscribers.
		 */
        public void AddSubscriber(Type subscriber)
        {
            IsValidSubscriber(subscriber);
            _subscribers.Add(subscriber);
        }

        /**
		 * Add a publisher to this connection. There can be many publishers.
		 */
        public void AddPublisher(Type publisher)
        {
            IsValidPublisher(publisher);
            _publishers.Add(publisher);
        }

        /**
		 * Connect to the remote ros environment.
		 */
        public void Connect()
        {

            _ws = new WebSocket(_host + ":" + _port);



            smallMessageThread = new System.Threading.Thread(SmallMessageProcessor);
            smallMessageThread.Start();


            middleMessageThread = new System.Threading.Thread(MiddleMessageProcessor);
            middleMessageThread.Start();


            bigMessageThread = new System.Threading.Thread(BigMessageProcessor);
            bigMessageThread.Start();

            _ws.OnMessage += (sender, e) => this.OnMessage(e.Data);

            
            _ws.OnOpen += (sender, e) =>
            {
                   
                onConnnect.Invoke();

            };


           
            _ws.OnError += (sender, e) =>
            {
                onError.Invoke();
            };

            _ws.OnClose += (sender, e) =>
            {
                onDisconnect.Invoke();

            };


            _ws.Connect();

            _myThread = new System.Threading.Thread(Run);

            _myThread.Start();
          

        }

        public bool isAlive()
        {


            if (_ws == null)
            {
                return false;
            }
            return _ws.IsAlive;



        }

        /**
		 * Disconnect from the remote ros environment.
		 */
        public void Disconnect()
        {
            
            _myThread.Abort();

            smallMessageThread.Abort();
            middleMessageThread.Abort();
            bigMessageThread.Abort();

       

            if (_ws != null && _ws.IsAlive==true)
            {

                foreach (Type p in _subscribers)
                {
                    _ws.Send(ROSBridgeMsg.UnSubscribe(GetMessageTopic(p)));
                    Debug.Log("Sending " + ROSBridgeMsg.UnSubscribe(GetMessageTopic(p)));
                }
                foreach (Type p in _publishers)
                {
                    _ws.Send(ROSBridgeMsg.UnAdvertise(GetMessageTopic(p)));
                    Debug.Log("Sending " + ROSBridgeMsg.UnAdvertise(GetMessageTopic(p)));
                }

                _ws.Close();
            }

           
        }



        private void SmallMessageProcessor()
        {


            while (true)
            {


                if (smallMessages.Count > 0)
                {
                    string message;

                    lock (smallMessageLock)
                    {
                        message = smallMessages.Dequeue();

                        ProcessMsg(message);
                    }
                     
                }
                else
                {
                    Thread.Sleep(300);
                }
            }

        }


        private void MiddleMessageProcessor()
        {



            while (true)
            {

                if (nextMiddleMessage != null)
                {
                    string message;

                    lock (middleMessageLock)
                    {
                        message = nextMiddleMessage;
                        nextMiddleMessage = null;

                    }
                    ProcessMsg(message);

                }
                else
                {
                    Thread.Sleep(300);
                }
            }

        }


        private void BigMessageProcessor()
        {



            while (true)
            {

                if (nextBigMessage != null)
                {
                    
                    string message;
                    lock (bigMessageLock)
                    {
                        
                        message = nextBigMessage;
                        nextBigMessage = null;

                    }

                    ProcessMsg(message);


                }
                else
                {
                    Thread.Sleep(300);
                }


            }

        }

        private void Run()
        {
            
//            _ws.OnMessage += (sender, e) => this.OnMessage(e.Data);


            foreach (Type p in _subscribers)
            {
                _ws.Send(ROSBridgeMsg.Subscribe(GetMessageTopic(p), GetMessageType(p)));
                Debug.Log("Sending " + ROSBridgeMsg.Subscribe(GetMessageTopic(p), GetMessageType(p)));
            }
            foreach (Type p in _publishers)
            {
                _ws.Send(ROSBridgeMsg.Advertise(GetMessageTopic(p), GetMessageType(p)));
                Debug.Log("Sending " + ROSBridgeMsg.Advertise(GetMessageTopic(p), GetMessageType(p)));
            }
            while (true)
            {
//                Debug.Log("Run :"+Thread.CurrentThread.Name+" at "+ System.DateTime.Now.Ticks);

                Thread.Sleep(1000);

//                Debug.Log("_taskQ:"+_taskQ.Count);

//                Debug.Log("_MessageQueue:"+_messages.Count);

                //int workThread;
                //int ioThread;

         

                // System.Threading.ThreadPool.GetAvailableThreads(out workThread,out ioThread);

                // Debug.Log("work Thread:"+workThread+" ioThread:"+ioThread);

            }
        }




        private void ProcessMsg(string s)
        {
            
            long startTime = System.DateTime.Now.Ticks;

            JSONNode node = JSONNode.Parse(s);

            long cost = (System.DateTime.Now.Ticks - startTime) / 10000;

            string op = node["op"];

            if ("publish".Equals(op))
            {

                foreach (Type p in _subscribers)
                {

                    string topic = node["topic"];

//                    Debug.Log(topic+" "+ s.Length);


                    if (topic.Equals(GetMessageTopic(p)))
                    {

                        ROSBridgeMsg msg = ParseMessage(p, node["msg"]);
                        RenderTask newTask = new RenderTask(p, topic, msg);
                        lock (_queueLock)
                        {
                           
//                            Thread.Sleep(10);

                            bool found = false;
                            for (int i = 0; i < _taskQ.Count; i++)
                            {
                                if (_taskQ[i].getTopic().Equals(topic))
                                {
                                    _taskQ.RemoveAt(i);
                                    _taskQ.Insert(i, newTask);
                                    found = true;
                                    break;
                                }
                            }
                            if (!found)
                            {

                                _taskQ.Add(newTask);
                            }

                        }

                    }
                }

            }
            else if ("service_response".Equals(op))
            {
                Debug.Log("Got service response " + node.ToString());
                _serviceName = node["service"];
                _serviceValues = (node["values"] == null) ? "" : node["values"].ToString();
            }
            else
            {
                Debug.Log("Must write code here for other messages");
            }





        }


       
        private void OnMessage(string s)
        {

            int length = s.Length;



            Debug.Log(s.Substring(0, s.IndexOf(",")));

              

            Debug.Log("recevied message:"+length);


            if (length < 500)
            {
                lock (smallMessageLock)
                {
                    
                    smallMessages.Enqueue(s);

                }

            }
            else if (length >= 500 && length < 40000)
            {

                lock (middleMessageLock)
                {

                    nextMiddleMessage = s;

                }


            }
            else if (length >= 40000)
            {

                lock (bigMessageLock)
                {

                    nextBigMessage = s;
                }

            }

//            maxMessageSize=Mathf.Max(maxMessageSize,s.Length);

//            _messages.Enqueue(s);

//            System.Object c= new System.Object();
//           
//            ThreadPool.QueueUserWorkItem(new WaitCallback(ThreadProc),(System.Object)s);


        }

        public void Render()
        {

          
            RenderTask newTask = null;
            lock (_queueLock)
            {
                if (_taskQ.Count > 0)
                {
                  
                    newTask = _taskQ[0];
                    _taskQ.RemoveAt(0);
                }
            }
            if (newTask != null)
            {
                            
                Update(newTask.getSubscriber(), newTask.getMsg());

            }

            if (_serviceName != null)
            {
                
                ServiceResponse(_serviceResponse, _serviceName, _serviceValues);

                _serviceName = null;
            }


        }

        public void Publish(String topic, ROSBridgeMsg msg)
        {
            if (_ws != null)
            {
                string s = ROSBridgeMsg.Publish(topic, msg.ToYAMLString());
                Debug.Log("Sending " + s);
                _ws.Send(s);
            }
        }

        public void CallService(string service, string args)
        {
            if (_ws != null)
            {
                string s = ROSBridgeMsg.CallService(service, args);
                Debug.Log("Sending " + s);
                _ws.Send(s);
            }
        }
    }
}
